package com.atguigu.app.dws;

import com.atguigu.app.func.KeyWordSplitFunction;
import com.atguigu.common.Constant;
import com.atguigu.utils.KafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

//数据流:web/app -> Nginx -> 日志服务器(文件) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> Doris
//程  序:Mock -> 日志文件 -> f1.sh -> Kafka(ZK) -> Dwd01_TrafficBaseLogSplit -> Kafka(ZK) -> Dws01_TrafficSourceKeywordPageViewWindow(CK HDFS) -> Doris
public class Dws01_TrafficSourceKeywordPageViewWindow {

    public static void main(String[] args) {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.1 开启CK
        env.enableCheckpointing(10000L);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointTimeout(20000L);
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //checkpointConfig.setCheckpointInterval(10000L);
        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
        checkpointConfig.setMaxConcurrentCheckpoints(2);
        //默认是int类型的最大值
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        env.setStateBackend(new HashMapStateBackend());

        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.读取Kafka page主题创建表  注意提取事件时间
        tableEnv.executeSql("" +
                "create table page_view(\n" +
                "    `page` map<string,string>,\n" +
                "    `ts` bigint,\n" +
                "    `rt` as TO_TIMESTAMP_LTZ(`ts`, 3),\n" +
                "    WATERMARK FOR `rt` AS `rt` - INTERVAL '2' SECOND\n" +
                ")" + KafkaUtil.getKafkaSourceDDL(Constant.TOPIC_DWD_TRAFFIC_PAGE, "dws_keyword_page_view_230315"));

        //3.过滤出搜索数据
        Table filterTable = tableEnv.sqlQuery("" +
                "select\n" +
                "    `page`['item'] item,\n" +
                "    `rt`\n" +
                "from page_view\n" +
                "where `page`['item_type'] = 'keyword'\n" +
                "and `page`['last_page_id'] = 'search'\n" +
                "and `page`['item'] is not null");
        tableEnv.createTemporaryView("filter_table", filterTable);

        //4.注册UDTF,并使用其进行分词
        tableEnv.createTemporarySystemFunction("split_func", KeyWordSplitFunction.class);
        Table splitTable = tableEnv.sqlQuery("" +
                "SELECT \n" +
                "    rt, \n" +
                "    word\n" +
                "FROM filter_table, LATERAL TABLE(split_func(item))");
        tableEnv.createTemporaryView("split_table", splitTable);

        //5.分组开窗聚合
        Table resultTable = tableEnv.sqlQuery("" +
                "SELECT \n" +
                "    DATE_FORMAT(window_start, 'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "    DATE_FORMAT(window_end, 'yyyy-MM-dd HH:mm:ss') edt, \n" +
                "    word keyword,\n" +
                "    DATE_FORMAT(window_start, 'yyyy-MM-dd') cur_date,\n" +
                "    count(*) keyword_count\n" +
                "FROM TABLE(\n" +
                "    TUMBLE(TABLE split_table, DESCRIPTOR(rt), INTERVAL '10' SECOND))\n" +
                "GROUP BY \n" +
                "    word,\n" +
                "    window_start, \n" +
                "    window_end");
        tableEnv.createTemporaryView("result_table", resultTable);

        //打印测试
//        tableEnv.sqlQuery("select * from result_table")
//                .execute()
//                .print();

        //6.将数据写出到Doris
        tableEnv.executeSql("" +
                "create table dws_keyword_page_view(\n" +
                "    `stt`           STRING,\n" +
                "    `edt`           STRING,\n" +
                "    `keyword`       STRING,\n" +
                "    `cur_date`      STRING,\n" +
                "    `keyword_count` BIGINT\n" +
                ") with (\n" +
                "    'connector' = 'doris',\n" +
                "    'fenodes' = 'hadoop102:7030',\n" +
                "    'table.identifier' = 'gmall_230315.dws_traffic_source_keyword_page_view_window',\n" +
                "    'username' = 'root',\n" +
                "    'password' = '000000',\n" +
                "    'sink.label-prefix' = 'doris_label',\n" +
                "    'sink.enable-delete' = 'false',\n" +
                "    'sink.enable-2pc' = 'false'\n" +
                ")");
        tableEnv.executeSql("insert into dws_keyword_page_view select * from result_table");

    }

}
